In [1]:
import numpy as np
import pandas as pd
import time
import math
from nltk.corpus import stopwords

from pyspark import SparkContext
from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.ml.feature import Word2Vec
from pyspark.ml.clustering import KMeans
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import PCA

import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

import word2vecUtilities as wvu

Read Twitter Data as a Spark DataFrame


In [2]:
t0 = time.time()
datapath = '/Users/jorgecastanon/Documents/github/w2v/data/tweets.gz'
tweets = sqlContext.read.json(datapath)
tweets.registerTempTable("tweets")
twr = tweets.count()
print "Number of tweets read: ", twr 
# this line add ~7 seconds (from ~24.5 seconds to ~31.5 seconds)
# Number of tweets read:  239082
print "Elapsed time (seconds): ", time.time() - t0
#Elapsed time (seconds):  31.9646401405


Number of tweets read:  239082
Elapsed time (seconds):  33.6140749454

Read Keywords: christmas, santa, turkey, ...


In [4]:
filterPath = '/Users/jorgecastanon/Documents/github/w2v/data/filter.txt'
filter = pd.read_csv(filterPath,header=None)
filter.head()


Out[4]:
0
0 santa
1 claus
2 merry
3 christmas
4 eve

Use Spark SQL to Filter Tweets:

+ In english

+ And containing at least one keyword


In [5]:
# Construct SQL Command
t0 = time.time()
sqlString = "("
for substr in filter[0]: #iteration on the list of words to filter (at most 50-100 words)
    sqlString = sqlString+"text LIKE '%"+substr+"%' OR "
    sqlString = sqlString+"text LIKE '%"+substr.upper()+"%' OR "
sqlString=sqlString[:-4]+")"
sqlFilterCommand = "SELECT lang, text FROM tweets WHERE (lang = 'en') AND "+sqlString

# Query tweets in english that contain at least one of the keywords
tweetsDF = sqlContext.sql(sqlFilterCommand).cache()
twf = tweetsDF.count()
print "Number of tweets after filtering: ", twf 
# last line add ~9 seconds (from ~0.72 seconds to ~9.42 seconds)
print "Elapsed time (seconds): ", time.time() - t0

print "Percetage of Tweets Used: ", float(twf)/twr


Number of tweets after filtering:  15999
Elapsed time (seconds):  10.3279871941
Percetage of Tweets Used:  0.0669184631214

Parse Tweets and Remove Stop Words


In [6]:
tweetsRDD = tweetsDF.select('text').rdd

def parseAndRemoveStopWords(text):
    t = text[0].replace(";"," ").replace(":"," ").replace('"',' ').replace('-',' ')
    t = t.replace(',',' ').replace('.',' ')
    t = t.lower().split(" ")
    stop = stopwords.words('english')
    return [i for i in t if i not in stop]

tw = tweetsRDD.map(parseAndRemoveStopWords)

Word2Vec: returns a dataframe with words and vectors

  • Sometimes you need to run this block twice (strange reason that need to de-bug)

In [8]:
# map to df
twDF = tw.map(lambda p: Row(text=p)).toDF()

# default minCount = 5 (we may need to try something larger: 20-100 to reduce cost)
# default vectorSize = 100 (we may want to keep default)
t0 = time.time()
word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol="text", outputCol="result")
modelW2V = word2Vec.fit(twDF)
wordVectorsDF = modelW2V.getVectors()
print "Elapsed time (seconds) to train Word2Vec: ", time.time() - t0


Elapsed time (seconds) to train Word2Vec:  8.02205491066

In [9]:
print sc.version


1.5.1

In [10]:
vocabSize = wordVectorsDF.count()
print "Vocabulary Size: ", vocabSize


Vocabulary Size:  3954

Find top N closest words


In [11]:
topN = 13
synonymsDF = modelW2V.findSynonyms('christmas', topN).toPandas()
synonymsDF


Out[11]:
word similarity
0 eve 0.913197
1 gift! 0.809615
2 eve! 0.757804
3 😍😍😍 0.750454
4 cup 0.749197
5 indirect 0.734012
6 christmas' 0.729327
7 merry 0.728448
8 tomorrow 0.718660
9 wishing 0.703304
10 life? 0.702444
11 🎄 0.700686
12 xmas 0.697368

As Expected, Unrelated terms are Not Accurate


In [12]:
synonymsDF = modelW2V.findSynonyms('dog', 5).toPandas()
synonymsDF


Out[12]:
word similarity
0 com 0.367090
1 bathroom 0.366653
2 talks 0.365062
3 too! 0.364378
4 points 0.360151

In [ ]:


In [ ]:

PCA on Top of Word2Vec using DF (spark.ml)


In [13]:
dfW2V = wordVectorsDF.select('vector').withColumnRenamed('vector','features')

numComponents = 3
pca = PCA(k = numComponents, inputCol = 'features', outputCol = 'pcaFeatures')
model = pca.fit(dfW2V)
dfComp = model.transform(dfW2V).select("pcaFeatures")

3D Visualization


In [14]:
word = 'christmas'
nwords = 200

#############

r = wvu.topNwordsToPlot(dfComp,wordVectorsDF,word,nwords)

############
fs=20 #fontsize
w = r['word']
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

height = 10
width = 10
fig.set_size_inches(width, height)

ax.scatter(r['X'], r['Y'], r['Z'], color='red', s=100, marker='o', edgecolors='black')
for i, txt in enumerate(w):
    if(i<7):
        ax.text(r['X'].ix[i],r['Y'].ix[i],r['Z'].ix[i], '%s' % (txt), size=20, zorder=1, color='k')
ax.set_xlabel('1st. Component', fontsize=fs)
ax.set_ylabel('2nd. Component', fontsize=fs)
ax.set_zlabel('3rd. Component', fontsize=fs)
ax.set_title('Visualization of Word2Vec via PCA', fontsize=fs)
ax.grid(True)
plt.show()


/Users/jorgecastanon/anaconda/lib/python2.7/site-packages/matplotlib/collections.py:590: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  if self._edgecolors == str('face'):

In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:

K-means on top of Word2Vec using DF (spark.ml)


In [20]:
t0=time.time()

K = int(math.floor(math.sqrt(float(vocabSize)/2)))
         # K ~ sqrt(n/2) this is a rule of thumb for choosing K,
         # where n is the number of words in the model
         # feel free to choose K with a fancier algorithm
         
dfW2V = wordVectorsDF.select('vector').withColumnRenamed('vector','features')
kmeans = KMeans(k=K, seed=1)
modelK = kmeans.fit(dfW2V)
labelsDF = modelK.transform(dfW2V).select('prediction').withColumnRenamed('prediction','labels')

print "Number of Clusters (K) Used: ", K
print "Elapsed time (seconds) :", time.time() - t0


Number of Clusters (K) Used:  44
Elapsed time (seconds) : 1.38142800331

In [ ]: